package com.fwmagic.dynamic_rule.functions;

import com.fwmagic.dynamic_rule.bean.LogBean;
import com.fwmagic.dynamic_rule.bean.ResultBean;
import com.fwmagic.dynamic_rule.bean.RuleAtomicParam;
import com.fwmagic.dynamic_rule.bean.RuleParam;
import com.fwmagic.dynamic_rule.service.UserActionCountQueryService;
import com.fwmagic.dynamic_rule.service.UserActionSequenceQueryService;
import com.fwmagic.dynamic_rule.service.UserProfileQueryService;
import com.fwmagic.dynamic_rule.service.impl.*;
import com.fwmagic.dynamic_rule.utils.RuleSimulator;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
 * 规则处理核心逻辑V2(Flink State && ClickHouse)
 */
public class RuleProcessFunctionV2 extends KeyedProcessFunction<String, LogBean, ResultBean> {

    private ListState<LogBean> listState;

    private UserProfileQueryService userProfileQueryService;

    private UserActionCountQueryService userActionCountQueryService;

    private UserActionSequenceQueryService userActionSequenceQueryService;

    private UserActionCountQueryService userActionCountQueryClickHouseService;

    private UserActionSequenceQueryService userActionSequenceQueryClickHouseService;

    @Override
    public void open(Configuration parameters) throws Exception {

        //准备一个底层明细事件的State
        ListStateDescriptor<LogBean> stateDescriptor = new ListStateDescriptor<>("events", LogBean.class);
        //设置State中只存储最近两个小时的数据
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).updateTtlOnCreateAndWrite().build();
        stateDescriptor.enableTimeToLive(stateTtlConfig);
        listState = getRuntimeContext().getListState(stateDescriptor);

        //构造底层核心的查询服务State
        userProfileQueryService = new UserProfileQueryServiceHbaseImpl();
        userActionCountQueryService = new UserActionCountQueryServiceStateImpl(listState);
        userActionSequenceQueryService = new UserActionSequenceQueryServiceStateImpl(listState);

        //ClickHouse
        userActionCountQueryClickHouseService = new UserActionCountQueryServiceClickHouseImpl();
        userActionSequenceQueryClickHouseService = new UserActionSequenceQueryServiceClickHouseImpl();
    }

    @Override
    public void processElement(LogBean logBean, Context context, Collector<ResultBean> collector) throws Exception {
        //将收到的事件放入历史明细state存储中
        listState.add(logBean);

        //获取规则参数
        RuleParam ruleParam = RuleSimulator.getRuleParam();

        //1.判断是否满足事件触发条件
        if (ruleParam.getTriggerParam().getEventId().equals(logBean.getEventId())) {
            //2.查询画像条件
            boolean userProfileMatch = userProfileQueryService.judgeProfileCondition(logBean.getDeviceId(), ruleParam);
            if (!userProfileMatch) return;

            System.out.println("规则计算被触发:" + logBean.getDeviceId() + ", " + logBean.getEventId());

            //3.查询用户行为次数条件
            //分别收集近2小时的事件和非近2小时的事件数据，以便分别从Flink的State中和ClickHouse中查询
            ArrayList<RuleAtomicParam> nearRuleAtomicParams = new ArrayList<>();
            ArrayList<RuleAtomicParam> farRuleAtomicParams = new ArrayList<>();

            //临界值:最近2小时
            Long splitTime = System.currentTimeMillis() - 2 * 60 * 60 * 1000;
            List<RuleAtomicParam> userActionCountParam = ruleParam.getUserActionCountParam();
            for (RuleAtomicParam ruleAtomicParam : userActionCountParam) {
                if (ruleAtomicParam.getRangeStart() < splitTime) {
                    farRuleAtomicParams.add(ruleAtomicParam);
                } else {
                    nearRuleAtomicParams.add(ruleAtomicParam);
                }
            }

            if (CollectionUtils.isNotEmpty(nearRuleAtomicParams)) {
                System.out.println("查询Flink的State数据");
                ruleParam.setUserActionCountParam(nearRuleAtomicParams);
                boolean userActionCountMatch = userActionCountQueryService.queryActionCounts("", ruleParam);
                if (!userActionCountMatch) return;
            }

            if (CollectionUtils.isNotEmpty(farRuleAtomicParams)) {
                System.out.println("查询ClickHouse的数据");
                ruleParam.setUserActionCountParam(farRuleAtomicParams);
                boolean userActionCountMatch = userActionCountQueryClickHouseService.queryActionCounts(logBean.getDeviceId(), ruleParam);
                if (!userActionCountMatch) return;
            }

            //4.查询用户行为事件序列条件
            List<RuleAtomicParam> userActionSequenceParam = ruleParam.getUserActionSequenceParam();
            if (CollectionUtils.isNotEmpty(userActionCountParam)) {
                //次序类的事件，每个事件的开始时间和结束时间都是一样的。比如在1-3号，发生过事件A-B-C的用户
                RuleAtomicParam ruleAtomicParam = userActionSequenceParam.get(0);
                if (ruleAtomicParam.getRangeStart() < splitTime) {
                    System.out.println("查询ClickHouse的数据");
                    //clickhouse
                    boolean isMatch = userActionSequenceQueryClickHouseService.queryActionSequence(logBean.getDeviceId(), ruleParam);
                    if (!isMatch) return;
                } else {
                    System.out.println("查询Flink的State数据");
                    //state
                    boolean isMatch = userActionSequenceQueryService.queryActionSequence("", ruleParam);
                    if (!isMatch) return;
                }
            }

            ResultBean resultBean = new ResultBean();
            resultBean.setTimeStamp(logBean.getTimeStamp());
            resultBean.setDeviceId(logBean.getDeviceId());
            resultBean.setRuleId(ruleParam.getRuleId());
            //收集如何规则匹配成功的结果
            collector.collect(resultBean);
        }
    }
}
